package fun.easycode.datastream;

import cn.hutool.core.collection.CollectionUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import fun.easycode.jointblock.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 数据消费者，每一个报表一个
 * @author xuzhen97
 */
@Slf4j
public class DataEntryConsumer<T> {
    private final IDataProcessor<T> processor;
    private final DefaultMQPushConsumer consumer;
    private final DataStreamProperties properties;

    /**
     * 构造函数
     * @param processor 数据处理器
     * @param properties canal配置
     * @throws MQClientException 创建异常
     */
    public DataEntryConsumer(IDataProcessor<T> processor, DataStreamProperties properties) throws MQClientException {
        this.processor = processor;
        this.properties = properties;
        consumer = createConsumer();
    }

    /**
     * 启动消费者
     * @throws MQClientException 启动异常
     */
    public void start() throws MQClientException {
        consumer.start();
    }

    /**
     * 创建消费者
     * @return 消费者
     * @throws MQClientException 创建异常
     */
    private DefaultMQPushConsumer createConsumer() throws MQClientException {
        // 初始化consumer，并设置consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(processor.getMark());

        // 设置NameServer地址
        consumer.setNamesrvAddr(properties.getRocketMq().getNameSrvAddr());
        //订阅一个或多个topic，并指定tag过滤条件，这里指定*表示接收所有tag的消息
        consumer.subscribe(properties.getRocketMq().getDefaultTopic(), processor.getMark());
        //注册回调接口来处理从Broker中收到的消息

        consumer.registerMessageListener((MessageListenerOrderly) (msgList, consumeOrderlyContext) -> {

            List<DataEntry<T>> data = msgList.stream()
                    .map(msg-> JacksonUtil.readValue(new String(msg.getBody()), new TypeReference<DataEntry<T>>() {
                    }))
                    .collect(Collectors.toList());

            List<T> copeData = data.stream().filter(entry -> entry.getDataMode() == DataMode.COPE)
                    .map(DataEntry::getData).collect(Collectors.toList());
            List<T> cleanData = data.stream().filter(entry -> entry.getDataMode() == DataMode.CLEAN_UP)
                    .map(DataEntry::getData).collect(Collectors.toList());
            try {

                // 处理数据
                if(CollectionUtil.isNotEmpty(copeData)){
                    String json = JacksonUtil.toJson(copeData);
                    DataContext.tryLock(processor, ()-> processor.process(copeData));
                    log.info("处理数据：{}", json);
                }

                // 清理数据
                if(CollectionUtil.isNotEmpty(cleanData)){
                    String json = JacksonUtil.toJson(cleanData);
                    DataContext.tryLock(processor, ()-> processor.cleanUp(cleanData));
                    log.info("清理数据：{}", json);
                }
                // 返回成功
                return ConsumeOrderlyStatus.SUCCESS;
            } catch (Exception e) {
                log.error("处理数据失败", e);
                // 返回失败，稍后重试
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        });

        return consumer;
    }
}
